package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Spark_Stream_Transform {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")


    val lines = ssc.socketTextStream("localhost",9999)
    //Driver 用来发配任务和调度
    //需要代码周期性的执行 用transform
    //transformval transformRDD:DStream[String] = lines.transform(
    ////      rdd =>{
    ////        //周期性的去执行  Driver
    ////        rdd.map(
    ////          rdd=>{
    ////            //执行者
    ////          }
    ////        )
    ////        rdd.flatMap(rdd=>{
    ////          //执行者
    ////          rdd
    ////        })
    ////
    ////      }
    ////    )
    ////    //Driver 用来发配任务和调度
    ////    val mapRDD:DStream[String] = lines.map(
    ////      rdd=>{
    ////        //Executor端 执行者
    ////        rdd
    ////      }
    ////    )方法可以将底层RDD获取后进行操作
//

    val resWord:DStream[(String,Int)] =  lines.transform(rdd=>{

      val words:RDD[String] = rdd.flatMap(_.split(" "))
      val wordOne:RDD[(String,Int)] = words.map( (_,1) )
      val wordCount:RDD[(String,Int)]= wordOne.reduceByKey(_+_)

      wordCount
    })

    resWord.print


    ssc.start()
    ssc.awaitTermination()

  }

}
